/**
 * FileName: StructuredStreamingHDFS3
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/02/22 10:25
 * Description: 读取来自Kafka或者HDFS的数据，经过在外部配置文件rule的过滤和action的操作之后，将结果进行存储
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.process.chain.ProcessChain;
import cn.com.bonc.process.impl.RuleActionProcessImpl;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

public class StructuredStreamingRuleFilter {


	//需要提取在外部的参数，checkpointLocation topic path kafka.bootstrap.servers
	private static final String SOURCE_PATH="/test_szj/source_data";

	private static final String SAVE_PATH = "hdfs://192.168.70.21:9000/test_szj/parquet7";

	private static Dataset<Row> dataset=null;
	public static void main(String[] args) {

		if (args.length < 1) {
			System.err.println("No data source selected !(HDFS/kafka)");
			System.exit(1);
		}

		SparkSession spark = SparkSession
				.builder()
				.appName("HDFSJoinHDFSApp")
				.getOrCreate();

		if (args[0].toUpperCase().equals("KAFKA")){
			//读取来自Kafka数据源
			dataset = spark
					.readStream()
					.format("kafka")
					.option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
					.option("subscribe", "stopic")
					.option("startingOffsets",ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET))
					.load();
		}else if (args[0].toUpperCase().equals("HDFS")){
			//读取来自HDFS数据源
			dataset = spark
					.readStream()
					.option("latestFirst","false")
					.option("fileNameOnly","false")
					.text(SOURCE_PATH);
		}else {
			System.err.println("Only 'kafka' or 'hdfs'!");
			System.exit(1);
		}

		Dataset<Row> filterResultDataset = ProcessChain
				.setSourceData(dataset)
				.addProcess(new RuleActionProcessImpl())
				.execute();

		DataStreamWriter<Row> rowDataStreamWriter = filterResultDataset
				.writeStream()
				.outputMode(OutputMode.Update());
		if (args[0].toUpperCase().equals("KAFKA")){
			rowDataStreamWriter = rowDataStreamWriter
					.format("kafka")
					.option("checkpointLocation", "hdfs://192.168.70.21:9000/test_szj/checkpoint")
					.option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
					.option("topic", "savetopic");
		}
		rowDataStreamWriter
				.format("text")
				.option("checkpointLocation", SAVE_PATH)
				.option("path", SAVE_PATH)
				.start();

		StreamingQuery query=rowDataStreamWriter.start();

		try {
			query.awaitTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}
	}

}